package com.lyon.demo.storage.client.api.core.core;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.ObjectUtil;
import com.lyon.demo.storage.client.api.core.config.DLedgerConfig;
import com.lyon.demo.storage.client.api.core.protocol.core.Role;
import com.lyon.demo.storage.common.util.Inet4Util;
import lombok.Data;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author LeeYan9
 * @since 2022-05-10
 */
@Data
public class MemberState {

    private final String group;
    private DLedgerConfig dLedgerConfig;

    /**
     * 当前任期编号
     */
    private volatile long currTerm = 0;

    /**
     * 最后以知的Leader任期编号
     */
    private volatile long ledgerEndTerm = 0;

    /**
     * 当前服务id
     */
    private final String selfId;

    private volatile Role role = Role.FOLLOWER;

    /**
     * state: -1 未知
     * state: 0 跟随者
     * state: 1 候选者
     * state: 2 领导者
     */
    private volatile int state = 0;

    /**
     * peerId1-peerAddress1;peerId2-peerAddress2;peerId3-peerAddress3;
     */
    private final String peers;

    /**
     * 同行服务地址map ； Map<PeerId,Address>
     */
    private Map<String, String> peerMap = new ConcurrentHashMap<>();

    private volatile String leaderId;

    private volatile long ledgerEndIndex = -1;
    private volatile long ledgerBeginIndex = -1;
    private volatile long committedIndex = -1;
    private volatile String currVoteFor;
    private volatile long knownMaxTermInGroup = -1;

    private static final char CLUSTER_ADDRESS_PLACEHOLDER = ';';


    public MemberState(DLedgerConfig dLedgerConfig) {
        this.dLedgerConfig = dLedgerConfig;
        this.selfId = dLedgerConfig.getSelfId();
        this.group = dLedgerConfig.getGroup();
        this.peers = dLedgerConfig.getPeers();
        for (String peerInfo : CharSequenceUtil.split(peers, CLUSTER_ADDRESS_PLACEHOLDER)) {
            String peerSelfId = peerInfo.split("-")[0];
            String peerAddress = peerInfo.substring(peerSelfId.length() + 1);
            peerMap.put(peerSelfId, peerAddress);
        }

    }

    public String getAddress(String peerId) {
        return peerMap.get(peerId);
    }

    /**
     * 获取除当前服务外的 其他服务id,连接地址
     *
     * @return Map<PeerId, Address>
     */
    public Map<String, String> getPeersWithoutSelf() {
        Map<String, String> withoutSelfPeers = ObjectUtil.clone(peerMap);
        withoutSelfPeers.remove(selfId);
        return withoutSelfPeers;
    }


    public Collection<String> getPeerIdsWithoutSelf() {
        return getPeersWithoutSelf().keySet();
    }

    public void putPeer(String peerId, String address) {
        this.getPeerMap().put(peerId, address);
    }

    public boolean isLeader() {
        return CharSequenceUtil.equals(leaderId, selfId);
    }

    public boolean isCandidate() {
        return role == Role.CANDIDATE;
    }

    public boolean isFollower() {
        return role == Role.FOLLOWER;
    }

    public boolean isQuorum(int successNum) {
        if (successNum > peerMap.size() / 2) {
            return true;
        }
        return false;
    }

    public int getPeerSize() {
        return getPeerMap().size();
    }

    public SocketAddress getPeerIp(String peerId) {
        String peerAddress = peerMap.get(peerId);
        Assert.notBlank(peerAddress, "peerId：{} 客户端地址不能为空", peerId);
        return Inet4Util.parseAddress(peerAddress);
    }

    public boolean isPeerMember(String leaderId) {
        return Objects.isNull(leaderId) || peerMap.containsKey(leaderId);
    }


    public synchronized void changeToCandidate(long term) {
        Assert.isTrue(term >= currTerm, "任期期限必须大于等于当前任期才能更新为候选者");
        if (term > knownMaxTermInGroup) {
            this.knownMaxTermInGroup = term;
        }
        this.currVoteFor = null;
        this.leaderId = null;
        this.role = Role.CANDIDATE;
    }

    public synchronized void changeToFollower(long term, String leaderId) {
        Assert.isTrue(term >= currTerm, "任期期限必须大于等于当前任期才能更新为候选者");
        if (term > knownMaxTermInGroup) {
            this.knownMaxTermInGroup = term;
        }
        this.leaderId = leaderId;
        this.role = Role.FOLLOWER;
    }

    public synchronized void changeToLeader(long term) {
        Assert.isTrue(term >= currTerm, "任期期限必须大于等于当前任期才能更新为候选者");
        if (term > knownMaxTermInGroup) {
            this.knownMaxTermInGroup = term;
        }
        this.leaderId = getSelfId();
        this.role = Role.LEADER;
    }

    public synchronized void setCurrVoteFor(String currVoteFor) {
        this.currVoteFor = currVoteFor;
        // todo 持久化 currTerm 和 投票人 ? 宕机后恢复
    }

    public synchronized long nextTerm() {
        if (currTerm < knownMaxTermInGroup) {
            currTerm = knownMaxTermInGroup;
        } else {
            ++currTerm;
        }
        // todo 持久化 currTerm 和 投票人 ? 宕机后恢复
        this.currVoteFor = null;
        return currTerm;
    }

    public InetSocketAddress getSelfAddress() {
        return (InetSocketAddress) getPeerIp(selfId);
    }

    public void updateLedgerIndex(long beginIndex, long endIndex) {
        this.ledgerBeginIndex = beginIndex;
        this.ledgerEndIndex = endIndex;
    }

    public void updateCommittedIndex(long committedIndex) {
        this.committedIndex = committedIndex;
    }
}
